package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.IndexPutHbaseWindowFunction;
import cn.itcast.function.SecIndexWindowFunction;
import cn.itcast.inter.ProcessDataInterface;
import cn.itcast.keyBy.KeySelectFunction;
import cn.itcast.sink.SinkHbase;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.hadoop.hbase.client.Put;

import java.util.List;

public class IndexQuotSecTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> waterData) {
//        1.数据分组
    waterData.keyBy(new KeySelectFunction())
                //2.划分时间窗口
                .timeWindow(Time.seconds(5))
//        3.秒级数据处理（新建数据写入Bean和秒级窗口函数）
                .apply(new SecIndexWindowFunction())
//        4.数据写入操作
//                * 封装ListPuts
//                * 数据写入
                .timeWindowAll(Time.seconds(5))
                .apply(new IndexPutHbaseWindowFunction())

       .addSink(new SinkHbase(QuotConfig.config.getProperty("index.hbase.table.name")));
    }
}
